home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-desktop-9.10-i386-PL.iso / casper / filesystem.squashfs / usr / share / pyshared / GDebi / DebPackage.py < prev    next >
Text File  |  2009-09-23  |  22KB  |  553 lines

  1. # Copyright (c) 2005-2009 Canonical Ltd
  2. #
  3. # AUTHOR:
  4. # Michael Vogt <mvo@ubuntu.com>
  5. #
  6. # This file is part of GDebi
  7. #
  8. # GDebi is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU General Public License as published
  10. # by the Free Software Foundation; either version 2 of the License, or (at
  11. # your option) any later version.
  12. #
  13. # GDebi is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  16. # General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with GDebi; if not, write to the Free Software
  20. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  21. #
  22.  
  23. import warnings
  24. from warnings import warn
  25. warnings.filterwarnings("ignore", "Accessed deprecated property", DeprecationWarning)
  26. import apt_inst, apt_pkg
  27. import apt
  28. import sys
  29. import os
  30. from gettext import gettext as _
  31. from Cache import Cache
  32.  
  33. from debian_bundle.debfile import DebFile
  34.  
  35. import gzip
  36. from StringIO import StringIO
  37.  
  38. class DebPackage(object):
  39.     debug = int(os.environ.get("GDEBI_DEBUG_LEVEL") or 0)
  40.  
  41.     def __init__(self, cache, file=None):
  42.         cache.clear()
  43.         self._cache = cache
  44.         self.file = file
  45.         self._needPkgs = []
  46.         self._sections = {}
  47.         self._installedConflicts = set()
  48.         self._failureString = ""
  49.         if file != None:
  50.             self.open(file)
  51.             
  52.     def open(self, file):
  53.         """ read a deb """
  54.         control = apt_inst.debExtractControl(open(file))
  55.         self._sections = apt_pkg.ParseSection(control)
  56.         self.pkgName = self._sections["Package"]
  57.  
  58.     def _isOrGroupSatisfied(self, or_group):
  59.         """ this function gets a 'or_group' and analyzes if
  60.             at least one dependency of this group is already satisfied """
  61.         self._dbg(2,"_checkOrGroup(): %s " % (or_group))
  62.  
  63.         for dep in or_group:
  64.             depname = dep[0]
  65.             ver = dep[1]
  66.             oper = dep[2]
  67.  
  68.             # check for virtual pkgs
  69.             if not self._cache.has_key(depname):
  70.                 if self._cache.isVirtualPkg(depname):
  71.                     self._dbg(3,"_isOrGroupSatisfied(): %s is virtual dep" % depname)
  72.                     for pkg in self._cache.getProvidersForVirtual(depname):
  73.                         if pkg.isInstalled:
  74.                             return True
  75.                 continue
  76.             # check real dependency
  77.             inst = self._cache[depname]
  78.             instver = inst.installedVersion
  79.             if instver != None and apt_pkg.CheckDep(instver,oper,ver) == True:
  80.                 return True
  81.  
  82.             # if no real dependency is installed, check if there is
  83.             # a package installed that provides this dependency
  84.             # (e.g. scrollkeeper dependecies are provided by rarian-compat)
  85.             # but only do that if there is no version required in the 
  86.             # dependency (we do not supprot versionized dependencies)
  87.             if not oper:
  88.                 for ppkg in self._cache.getProvidersFor(depname):
  89.                     if ppkg.isInstalled:
  90.                         self._dbg(3, "found installed '%s' that provides '%s'" % (ppkg.name, depname))
  91.                         return True
  92.         return False
  93.             
  94.  
  95.     def _satisfyOrGroup(self, or_group):
  96.         """ try to satisfy the or_group """
  97.  
  98.         or_found = False
  99.         virtual_pkg = None
  100.  
  101.         for dep in or_group:
  102.             depname = dep[0]
  103.             ver = dep[1]
  104.             oper = dep[2]
  105.  
  106.             # if we don't have it in the cache, it may be virtual
  107.             if not self._cache.has_key(depname):
  108.                 if not self._cache.isVirtualPkg(depname):
  109.                     continue
  110.                 providers = self._cache.getProvidersForVirtual(depname)
  111.                 # if a package just has a single virtual provider, we
  112.                 # just pick that (just like apt)
  113.                 if len(providers) != 1:
  114.                     continue
  115.                 depname = providers[0].name
  116.                 
  117.             # now check if we can satisfy the deps with the candidate(s)
  118.             # in the cache
  119.             cand = self._cache[depname]
  120.             candver = self._cache._depcache.GetCandidateVer(cand._pkg)
  121.             if not candver:
  122.                 continue
  123.             if not apt_pkg.CheckDep(candver.VerStr,oper,ver):
  124.                 continue
  125.  
  126.             # check if we need to install it
  127.             self._dbg(2,"Need to get: %s" % depname)
  128.             self._needPkgs.append(depname)
  129.             return True
  130.  
  131.         # if we reach this point, we failed
  132.         or_str = ""
  133.         for dep in or_group:
  134.             or_str += dep[0]
  135.             if ver and oper:
  136.                 or_str += " (%s %s)" % (dep[2], dep[1])
  137.             if dep != or_group[len(or_group)-1]:
  138.                 or_str += "|"
  139.         self._failureString += _("Dependency is not satisfiable: %s\n") % or_str
  140.         return False
  141.  
  142.     def _checkSinglePkgConflict(self, pkgname, ver, oper):
  143.         """ returns true if a pkg conflicts with a real installed/marked
  144.             pkg """
  145.         # FIXME: deal with conflicts against its own provides
  146.         #        (e.g. Provides: ftp-server, Conflicts: ftp-server)
  147.         self._dbg(3, "_checkSinglePkgConflict() pkg='%s' ver='%s' oper='%s'" % (pkgname, ver, oper))
  148.         pkgver = None
  149.         cand = self._cache[pkgname]
  150.         if cand.isInstalled:
  151.             pkgver = cand.installedVersion
  152.         elif cand.markedInstall:
  153.             pkgver = cand.candidateVersion
  154.         #print "pkg: %s" % pkgname
  155.         #print "ver: %s" % ver
  156.         #print "pkgver: %s " % pkgver
  157.         #print "oper: %s " % oper
  158.         if (pkgver and apt_pkg.CheckDep(pkgver,oper,ver) and 
  159.             not self.replacesRealPkg(pkgname, oper, ver)):
  160.             self._failureString += _("Conflicts with the installed package '%s'\n") % cand.name
  161.             return True
  162.         return False
  163.  
  164.     def _checkConflictsOrGroup(self, or_group):
  165.         """ check the or-group for conflicts with installed pkgs """
  166.         self._dbg(2,"_checkConflictsOrGroup(): %s " % (or_group))
  167.  
  168.         or_found = False
  169.         virtual_pkg = None
  170.  
  171.         for dep in or_group:
  172.             depname = dep[0]
  173.             ver = dep[1]
  174.             oper = dep[2]
  175.  
  176.             # check conflicts with virtual pkgs
  177.             if not self._cache.has_key(depname):
  178.                 # FIXME: we have to check for virtual replaces here as 
  179.                 #        well (to pass tests/gdebi-test8.deb)
  180.                 if self._cache.isVirtualPkg(depname):
  181.                     for pkg in self._cache.getProvidersForVirtual(depname):
  182.                         self._dbg(3, "conflicts virtual check: %s" % pkg.name)
  183.                         # P/C/R on virtal pkg, e.g. ftpd
  184.                         if self.pkgName == pkg.name:
  185.                             self._dbg(3, "conflict on self, ignoring")
  186.                             continue
  187.                         if self._checkSinglePkgConflict(pkg.name,ver,oper):
  188.                             self._installedConflicts.add(pkg.name)
  189.                 continue
  190.             if self._checkSinglePkgConflict(depname,ver,oper):
  191.                 self._installedConflicts.add(depname)
  192.         return len(self._installedConflicts) != 0
  193.  
  194.     def getConflicts(self):
  195.         conflicts = []
  196.         key = "Conflicts"
  197.         if self._sections.has_key(key):
  198.             conflicts = apt_pkg.ParseDepends(self._sections[key])
  199.         return conflicts
  200.  
  201.     def getDepends(self):
  202.         depends = []
  203.         # find depends
  204.         for key in ["Depends","Pre-Depends"]:
  205.             if self._sections.has_key(key):
  206.                 depends.extend(apt_pkg.ParseDepends(self._sections[key]))
  207.         return depends
  208.  
  209.     def getProvides(self):
  210.         provides = []
  211.         key = "Provides"
  212.         if self._sections.has_key(key):
  213.             provides = apt_pkg.ParseDepends(self._sections[key])
  214.         return provides
  215.  
  216.     def getReplaces(self):
  217.         replaces = []
  218.         key = "Replaces"
  219.         if self._sections.has_key(key):
  220.             replaces = apt_pkg.ParseDepends(self._sections[key])
  221.         return replaces
  222.  
  223.     def replacesRealPkg(self, pkgname, oper, ver):
  224.         """ 
  225.         return True if the deb packages replaces a real (not virtual)
  226.         packages named pkgname, oper, ver 
  227.         """
  228.         self._dbg(3, "replacesPkg() %s %s %s" % (pkgname,oper,ver))
  229.         pkgver = None
  230.         cand = self._cache[pkgname]
  231.         if cand.isInstalled:
  232.             pkgver = cand.installedVersion
  233.         elif cand.markedInstall:
  234.             pkgver = cand.candidateVersion
  235.         for or_group in self.getReplaces():
  236.             for (name, ver, oper) in or_group:
  237.                 if (name == pkgname and 
  238.                     apt_pkg.CheckDep(pkgver,oper,ver)):
  239.                     self._dbg(3, "we have a replaces in our package for the conflict against '%s'" % (pkgname))
  240.                     return True
  241.         return False
  242.  
  243.     def checkConflicts(self):
  244.         """ check if the pkg conflicts with a existing or to be installed
  245.             package. Return True if the pkg is ok """
  246.         res = True
  247.         for or_group in self.getConflicts():
  248.             if self._checkConflictsOrGroup(or_group):
  249.                 #print "Conflicts with a exisiting pkg!"
  250.                 #self._failureString = "Conflicts with a exisiting pkg!"
  251.                 res = False
  252.         return res
  253.  
  254.     def checkBreaksExistingPackages(self):
  255.         """ 
  256.         check if installing the package would break exsisting 
  257.         package on the system, e.g. system has:
  258.         smc depends on smc-data (= 1.4)
  259.         and user tries to installs smc-data 1.6
  260.         """
  261.         # show progress information as this step may take some time
  262.         size = float(len(self._cache))
  263.         steps = int(size/50)
  264.         debver = self._sections["Version"]
  265.         for (i, pkg) in enumerate(self._cache):
  266.             if i%steps == 0:
  267.                 self._cache.op_progress.update(float(i)/size*100.0)
  268.             if not pkg.isInstalled:
  269.                 continue
  270.             # check if the exising dependencies are still satisfied
  271.             # with the package
  272.             ver = pkg._pkg.CurrentVer
  273.             for dep_or in pkg.installedDependencies:
  274.                 for dep in dep_or.or_dependencies:
  275.                     if dep.name == self.pkgName:
  276.                         if not apt_pkg.CheckDep(debver,dep.relation,dep.version):
  277.                             self._dbg(2, "would break (depends) %s" % pkg.name)
  278.                             # TRANSLATORS: the first '%s' is the package that breaks, the second the dependency that makes it break, the third the releation (e.g. >=) and the latest the version for the releation
  279.                             self._failureString += _("Breaks existing package '%s' dependency %s (%s %s)\n") % (pkg.name, dep.name, dep.relation, dep.version)
  280.                             self._cache.op_progress.done()
  281.                             return False
  282.             # now check if there are conflicts against this package on
  283.             # the existing system
  284.             if ver.DependsList.has_key("Conflicts"):
  285.                 for conflictsVerList in ver.DependsList["Conflicts"]:
  286.                     for cOr in conflictsVerList:
  287.                         if cOr.TargetPkg.Name == self.pkgName:
  288.                             if apt_pkg.CheckDep(debver, cOr.CompType, cOr.TargetVer):
  289.                                 self._dbg(2, "would break (conflicts) %s" % pkg.name)
  290.                 # TRANSLATORS: the first '%s' is the package that conflicts, the second the packagename that it conflicts with (so the name of the deb the user tries to install), the third is the relation (e.g. >=) and the last is the version for the relation
  291.                                 self._failureString += _("Breaks exisiting package '%s' conflict: %s (%s %s)\n") % (pkg.name, cOr.TargetPkg.Name, cOr.CompType, cOr.TargetVer)
  292.                                 self._cache.op_progress.done()
  293.                                 return False
  294.         self._cache.op_progress.done()
  295.         return True
  296.  
  297.     # some constants
  298.     (NO_VERSION,
  299.      VERSION_OUTDATED,
  300.      VERSION_SAME,
  301.      VERSION_IS_NEWER) = range(4)
  302.     
  303.     def compareToVersionInCache(self, useInstalled=True):
  304.         """ checks if the pkg is already installed or availabe in the cache
  305.             and if so in what version, returns if the version of the deb
  306.             is not available,older,same,newer
  307.         """
  308.         self._dbg(3,"compareToVersionInCache")
  309.         pkgname = self._sections["Package"]
  310.         debver = self._sections["Version"]
  311.         self._dbg(1,"debver: %s" % debver)
  312.         if self._cache.has_key(pkgname):
  313.             if useInstalled:
  314.                 cachever = self._cache[pkgname].installedVersion
  315.             else:
  316.                 cachever = self._cache[pkgname].candidateVersion
  317.             if cachever != None:
  318.                 cmp = apt_pkg.VersionCompare(cachever,debver)
  319.                 self._dbg(1, "CompareVersion(debver,instver): %s" % cmp)
  320.                 if cmp == 0:
  321.                     return self.VERSION_SAME
  322.                 elif cmp < 0:
  323.                     return self.VERSION_IS_NEWER
  324.                 elif cmp > 0:
  325.                     return self.VERSION_OUTDATED
  326.         return self.NO_VERSION
  327.  
  328.     def checkDeb(self):
  329.         self._dbg(3,"checkDepends")
  330.  
  331.         # check arch
  332.         if not self._sections.has_key("Architecture"):
  333.             self._dbg(1, "ERROR: no architecture field")
  334.             self._failureString = _("No Architecture field in the package")
  335.             return False
  336.         arch = self._sections["Architecture"]
  337.         if  arch != "all" and arch != apt_pkg.Config.Find("APT::Architecture"):
  338.             self._dbg(1,"ERROR: Wrong architecture dude!")
  339.             self._failureString = _("Wrong architecture '%s'") % arch
  340.             return False
  341.  
  342.         # check version
  343.         res = self.compareToVersionInCache()
  344.         if res == self.VERSION_OUTDATED: # the deb is older than the installed
  345.             self._failureString = _("A later version is already installed")
  346.             return False
  347.  
  348.         # FIXME: this sort of error handling sux
  349.         self._failureString = ""
  350.  
  351.         # check conflicts
  352.         if not self.checkConflicts():
  353.             return False
  354.  
  355.         # set progress information
  356.         self._cache.op_progress.subOp = _("Analysing dependencies")
  357.  
  358.         # check if installing it would break anything on the 
  359.         # current system
  360.         if not self.checkBreaksExistingPackages():
  361.             return False
  362.  
  363.         # try to satisfy the dependencies
  364.         res = self._satisfyDepends(self.getDepends())
  365.         if not res:
  366.             return False
  367.  
  368.         # check for conflicts again (this time with the packages that are
  369.         # makeed for install)
  370.         if not self.checkConflicts():
  371.             return False
  372.  
  373.         if self._cache._depcache.BrokenCount > 0:
  374.             self._failureString = _("Failed to satisfy all dependencies (broken cache)")
  375.             # clean the cache again
  376.             self._cache.clear()
  377.             return False
  378.         return True
  379.  
  380.     def satisfyDependsStr(self, dependsstr):
  381.         return self._satisfyDepends(apt_pkg.ParseDepends(dependsstr))
  382.  
  383.     def _satisfyDepends(self, depends):
  384.         # turn off MarkAndSweep via a action group (if available)
  385.         try:
  386.             _actiongroup = apt_pkg.GetPkgActionGroup(self._cache._depcache)
  387.         except AttributeError, e:
  388.             pass
  389.         # check depends
  390.         for or_group in depends:
  391.             #print "or_group: %s" % or_group
  392.             #print "or_group satified: %s" % self._isOrGroupSatisfied(or_group)
  393.             if not self._isOrGroupSatisfied(or_group):
  394.                 if not self._satisfyOrGroup(or_group):
  395.                     return False
  396.         # now try it out in the cache
  397.             for pkg in self._needPkgs:
  398.                 try:
  399.                     self._cache[pkg].markInstall(fromUser=False)
  400.                 except SystemError, e:
  401.                     self._failureString = _("Cannot install '%s'") % pkg
  402.                     self._cache.clear()
  403.                     return False
  404.         return True
  405.  
  406.     def missingDeps(self):
  407.         self._dbg(1, "Installing: %s" % self._needPkgs)
  408.         if self._needPkgs == None:
  409.             self.checkDeb()
  410.         return self._needPkgs
  411.     missingDeps = property(missingDeps)
  412.  
  413.     def requiredChanges(self):
  414.         """ gets the required changes to satisfy the depends.
  415.             returns a tuple with (install, remove, unauthenticated)
  416.         """
  417.         install = []
  418.         remove = []
  419.         unauthenticated = []
  420.         for pkg in self._cache:
  421.             if pkg.markedInstall or pkg.markedUpgrade:
  422.                 install.append(pkg.name)
  423.                 # check authentication, one authenticated origin is enough
  424.                 # libapt will skip non-authenticated origins then
  425.                 authenticated = False
  426.                 for origin in pkg.candidateOrigin:
  427.                     authenticated |= origin.trusted
  428.                 if not authenticated:
  429.                     unauthenticated.append(pkg.name)
  430.             if pkg.markedDelete:
  431.                 remove.append(pkg.name)
  432.         return (install,remove, unauthenticated)
  433.     requiredChanges = property(requiredChanges)
  434.  
  435.     def control_filelist(self):
  436.         """ return the list of files in control.tar.gt """
  437.         content = []
  438.         for name in DebFile(self.file).control:
  439.             if name and name != ".":
  440.                 content.append(name)
  441.         return sorted(content)
  442.     control_filelist = property(control_filelist)
  443.  
  444.     def to_hex(self, in_data):
  445.         hex = ""
  446.         for (i, c) in enumerate(in_data):
  447.             if i%80 == 0:
  448.                 hex += "\n"
  449.             hex += "%2.2x " % ord(c)
  450.         return hex
  451.  
  452.     def to_strish(self, in_data):
  453.         s = ""
  454.         for c in in_data:
  455.             if ord(c) < 10 or ord(c) > 127:
  456.                 s += " "
  457.             else:
  458.                 s += c
  459.         return s
  460.         
  461.     def _get_content(self, part, name, auto_decompress=True, auto_hex=True):
  462.         data = part.get_content(name)
  463.         # check for zip content
  464.         if name.endswith(".gz") and auto_decompress:
  465.             io = StringIO(data)
  466.             gz = gzip.GzipFile(fileobj=io)
  467.             data = _("Automatically decompressed:\n\n")
  468.             data += gz.read()
  469.         # auto-convert to hex
  470.         try:
  471.             data = unicode(data, "utf-8")
  472.         except Exception, e:
  473.             new_data = _("Automatically converted to printable ascii:\n")
  474.             new_data += self.to_strish(data)
  475.             return new_data
  476.         return data
  477.  
  478.     def control_content(self, name):
  479.         """ return the content of a specific control.tar.gz file """
  480.         control = DebFile(self.file).control
  481.         if name in control:
  482.             return self._get_content(control, name)
  483.         return ""
  484.  
  485.     def data_content(self, name):
  486.         """ return the content of a specific control.tar.gz file """
  487.         data = DebFile(self.file).data
  488.         if name in data:
  489.             return self._get_content(data, name)
  490.         return ""
  491.  
  492.     def filelist(self):
  493.         """ return the list of files in the deb """
  494.         files = []
  495.         def extract_cb(What,Name,Link,Mode,UID,GID,Size,MTime,Major,Minor):
  496.             #print "%s '%s','%s',%u,%u,%u,%u,%u,%u,%u"\
  497.             #      % (What,Name,Link,Mode,UID,GID,Size, MTime, Major, Minor)
  498.             if Name != "./":
  499.                 files.append(Name)
  500.         try:
  501.             try:
  502.                 apt_inst.debExtract(open(self.file), extract_cb, "data.tar.gz")
  503.             except SystemError, e:
  504.                 try:
  505.                     apt_inst.debExtract(open(self.file), extract_cb, "data.tar.bz2")
  506.                 except SystemError, e:
  507.                     try:
  508.                         apt_inst.debExtract(open(self.file), extract_cb, "data.tar.lzma")
  509.                     except SystemError, e:
  510.                         return [_("List of files could not be read, please report this as a bug")]
  511.         # IOError may happen because of gvfs madness (LP: #211822)
  512.         except IOError, e:
  513.             return [_("IOError during filelist read: %s") % e]
  514.         return files
  515.     filelist = property(filelist)
  516.     
  517.     # properties
  518.     def __getitem__(self,item):
  519.         if not self._sections.has_key(item):
  520.             # Translators: it's for missing entries in the deb package,
  521.             # e.g. a missing "Maintainer" field
  522.             return _("%s is not available") % item
  523.         return self._sections[item]
  524.  
  525.     def _dbg(self, level, msg):
  526.         """Write debugging output to sys.stderr.
  527.         """
  528.         if level <= self.debug:
  529.             print >> sys.stderr, msg
  530.  
  531.  
  532. if __name__ == "__main__":
  533.  
  534.     cache = Cache()
  535.  
  536.     vp = "www-browser"
  537.     print "%s virtual: %s" % (vp,cache.isVirtualPkg(vp))
  538.     providers = cache.getProvidersForVirtual(vp)
  539.     print "Providers for %s :" % vp
  540.     for pkg in providers:
  541.         print " %s" % pkg.name
  542.     
  543.     d = DebPackage(cache, sys.argv[1])
  544.     print "Deb: %s" % d.pkgName
  545.     if not d.checkDeb():
  546.         print "can't be satified"
  547.         print d._failureString
  548.     print "missing deps: %s" % d.missingDeps
  549.     print d.requiredChanges
  550.  
  551.     print d.filelist
  552.     print d.data_content("usr/share/locale/bn/LC_MESSAGES/gdebi.mo")
  553.